linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令 您所在的位置:网站首页 sftp 获取文件大小的命令 linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令

linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令

2024-07-16 16:58| 来源: 网络整理| 查看: 265

需求

需求方通过sftp不定时的上传一批用户(SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv),需要我们从这些用户中找出满足条件的用户。然后把这些结果用户通过文件的形式上传到ftp。

环境说明

ip1能连接hive库环境,不能连接sftp。ip2不能连接hive库环境,能连接sftp。ip1和ip2是共享盘,能同时访问公共目录。

目录规划

源文件名: SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv (例:SBXDS_ACC_M_test001_20240201103828.csv)结果文件名: WTF_YBZ_DSGS_任务id.csv (例:WTF_YBZ_DSGS_test001.csv)本地路径: /date/localPath/SBXDS_ACC_Msftp下载路径(源文件):/ftpdata/preciseUploadsftp上传路径(结果文件):/ftpdata/orderringlist

处理逻辑

1.通过sftp获取preciseUpload目录下,当天的 SBXDS_ACC_M_*_YYYYMMDD*.csv 文件名,并写入本地文件SBXDS_ACC_M.txt2.比对SBXDS_ACC_M.txt里面的文件是否在SBXDS_ACC_M目录下存在,不存在则将文件名写入SBXDS_ACC_M_NEWFILES.txt。3.获取SBXDS_ACC_M_NEWFILE.txt里面的文件,并写入hive表 dods_ftp_target_users partition(task_id)。4.目标用户与通信圈用户匹配,跑出结果数据dapp_ftp_calling_txq_users partition(task_id)。5.结果数据写入文件WTF_YBZ_DSGS_task_id.csv,并上传sftp6.为了防止文件正在传输,内容未传输完,需求方就把文件取走了,导致漏数据的情况。在文件上传之后,对文件进行重命名处理。

hive表 -- 目标用户hive表 create table ods_target_users( phone_no string ) -- 结果数据hive表 create table dapp_target_result_users( phone_no string, msg_info string ) 脚本执行顺序

sh ftp_getTodayFiles.sh 20240201 # ip1执行,文件处理sh ftp_getTxq_users.sh # ip2 执行,跑SQLsh ftp_sftpPutFiles.sh # ip1 执行,文件处理

脚本 #!/bin/bash # ftp_getTodayFiles.sh # ip2 执行 # 调用方式 sh ftp_getTodayFiles.sh 20240201 # 定义函数,获取文件名称 function funcGetTodayFilesName(){ dateNo=$1 PORT=22 HOST="ip1" USERNAME="uname1" PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M" SFTPDIR="/ftpdata/preciseUpload" FILENAME="SBXDS_ACC_M_*_$dateNo*" cd $LOCALDIR /usr/bin/expect SBXDS_ACC_M_FILES.txt echo "获取sftp目录下当天文件名称结束" dayfiles=$(cat SBXDS_ACC_M_FILES.txt|wc -l) if [ $dayfiles -eq 0 ]; then echo "sftp今日无文件。" exit else echo "当日所有文件:" cat SBXDS_ACC_M_FILES.txt fi echo "清空文件 SBXDS_ACC_M_NEWFILES.txt" echo "判断本地是否存在文件,不存在则将文件名写入文件SBXDS_ACC_M_NEWFILES.txt" > SBXDS_ACC_M_NEWFILES.txt for filename in $(cat SBXDS_ACC_M_FILES.txt|sed 's/\r$//') do if [ -e $filename ]; then echo "$filename 已存在" else echo "$filename 新文件" echo $filename >> SBXDS_ACC_M_NEWFILES.txt fi done newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles -eq 0 ]; then echo "无新增文件。" exit else echo "文件名写入SBXDS_ACC_M_NEWFILES.txt结束" echo "当日新增文件$newfiles个:" cat SBXDS_ACC_M_NEWFILES.txt fi # 调用函数,sftp获取文件 echo "开始下载文件...." LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//') do echo "下载$filename..." funcSftpGetFile $filename done echo "下载文件结束,文件名如下:" cat SBXDS_ACC_M_NEWFILES.txt # 验证 SBXDS_ACC_M_NEWFILES.txt newfiles2=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles2 -eq 0 ]; then echo "文件异常!!!" exit 1 fi ftp_getTodayFiles.sh #!/bin/bash # ftp_getTxq_users.sh # ip1 执行 # 把SBXDS_ACC_M_NEWFILE.txt里面的文件写入hive表 dods_ftp_target_users partition(task_id)。 # 匹配通信圈用户 dapp_ftp_txq_users # 结果数据导入文件WTF_YBZ_DSGS_task_id # 调用格式: sh ftp_getTxq_users.sh 20240114 dateNo=$1 LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles -eq 0 ]; then echo "无新增文件。" exit fi fileList=() for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//') do task_id=$(echo $filename|cut -d'_' -f4) echo "文件名: $filename,任务ID: $task_id" echo "执行命令:hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id if [ $? -eq 0 ]; then echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id else echo "分区 $task_id 已存在。" echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id fi # 匹配通信圈用户 hive -e" MSCK REPAIR TABLE dods_ftp_target_users; insert overwrite table dapp_ftp_txq_users partition(task_id='${task_id}') select a.phone_no, b.msg_info from dods_ftp_target_users a, dods_msg_info b where a.phone_no=b.phone_no and a.task_id='${task_id}' and b.deal_day='${dateNo}' group by a.phone_no, b.msg_info " # 结果数据导入文件 hive -e" select concat_ws(',',phone_no,msg_info) from dapp_ftp_txq_users where task_id = '${task_id}' " > WTF_YBZ_DSGS_${task_id} # 文件记录数 file_rows=$(cat WTF_YBZ_DSGS_${task_id}|wc -l) msgs="源文件$filename的结果文件WTF_YBZ_DSGS_${task_id},记录数$file_rows。" fileList+=$msgs done echo "************************* 执行成功 *************************" for msg in ${fileList[*]} do echo $msg done ftp_getTxq_users.sh #!/bin/bash # ftp_sftpPutFiles.sh # ip2 执行 # 调用格式 sh ftp_sftpPutFiles.sh # 定义函数,sftp上传文件 function sftpPutFiles(){ FILENAME=$1 PORT=22 HOST="ip1" USERNAME="uname1" PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M" SFTPDIR="/ftpdata/orderringlist" cd $LOCALDIR /usr/bin/expect


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有